There is no shortage of Big Data applications and frameworks nowadays, and sometimes it may even seem that all niches have already been filled. That’s not how creators of Apache Flink see it, though.Even though their project is not yet as well known as Spark or Hadoop, it has brought enough innovations to become a real game-changer in the world of Big Data.

In this article, I would like to introduce Apache Flink, describe what its main features are, and why is it different from other available solutions. I’ll end the article with an example of a simple stream processing application using Flink.

This article was originally written for DZone.

What is Apache Flink?

Apache Flink (German for “quick”) is a Big Data framework for stream and batch data processing. It allows developers to build complex data processing pipelines and execute them in a distributed environment. Flink’s main selling point is that it provides first-class support for stream processing and implements batching as a special case.

Why is this a big deal? How can a streaming framework be a game changer? The hatch lies in the fact that some data rapidly depreciates, and if it is not processed rapidly can become useless. Think about fraud detection or processing stock prices. Also unbounded sources of data are now very common and stream processing is a more natural choice.

Many existing systems were developed specifically for batch processing of data of a finite size and are not well suited for this kind of task. Flink, on the other hand, was designed with stream processing in mind

To begin with, let me introduce the main features of Apache Flink:

  • Powerful computational model – with Apache Flink you can build data processing algorithms as a sequence of “pipeline” operations. If you are familiar with Apache Spark, you won’t be surprised by Flink’s API. To process data you need to use instances DataStream (for streaming) or DataSet (for batching), and apply transformations like map, filter, groupBy, etc. For example, the following example processes a stream of integers, filters out all odd numbers and multiplies the remaining by 10:
DataStream<Integer> integerDataStream = ..;
integerDataStream
    .filter(new FilterFunction<Integer>() {
        @Override
        public boolean filter(Integer i) throws Exception {
            return i % 2 == 0;
        }
    })
    .map(new MapFunction<Integer, Integer>() {
        @Override
        public Integer map(Integer i) throws Exception {
            return i * 10;
        }
    });
  • Batch and stream support – Flink allows you to use a single system for both batch and stream data processing. Batch processing looks very similar and the main difference from the developer’s perspective is that Flink provides different sets of operations for streaming/batching.
  • Java/Scala/Python support – while Flink is mostly implemented in Java it allows you to write data processing application in Java, Scala, and Python. It also has integration with Apache Zeppelin, a web-based notebook for data analytics.
  • Multiple execution environments – the Flink application can be executed in a distributed environment on YARN, Mesos, EC2, Google cloud or locally (for testing purposes).
  • Various input/output extensions – the Flink framework has multiple connectors for reading data (called sources) and for writing result data (called sink). These include queuing systems (Kafka, Kinesis, RabbitMQ, etc.), databases (HBase, JDBC, MongoDB, etc.), filesystems (HDFS, S3), and even some exotic connectors like Twitter connector.
  • Powerful libraries – the Flink team has implemented a number of libraries on top of stream and batch processing which include graph processing (Gelly), complex events processing (CEP), and Machine Learning library (ML).
  • SQL support – another library built on top of Flink that allows you to implement stream/batch processing by writing SQL queries.
  • Exactly-once processing – Flink provides exactly-once processing guarantees that allow accurate stateful computations on streams.
  • Fault-tolerance – Flink implements a lightweight checkpointing mechanism that allows to accurately process data even in a case of hardware failure.
  • Flow control – if you have a slow step in your processing pipeline Flink will naturally cause a backpressure on previous steps and data source causing them to produce new elements slower.
  • Vibrant community – in recent years, Flink developers have built a significant community around the project. Big companies like Alibaba and Zalando have started using it for Big Data processing, while a number of Open Source contributors has tripled since last year.
  • Commercial support – the creators of Flink have founded a company called data Artisans that provides commercial support for Apache Flink

Here is a bird’s eyes view of the Apache Flink architecture:

flink_architecture

How is it different from Spark?

So far it may sound like Apache Flink is suspiciously similar to Apache Spark. You may even wonder why you would consider a less mature project over an older, well-established project like Spark? Despite the fact they have many similarities, Flink has several significant advantages:

  • True streaming – Spark started as a batch processing framework and implements streaming on top of it as a series of micro-batches. In turn, Flink implements real-time streaming, meaning that individual elements are sent forward as soon as they are processed by current “pipeline” step. This allows achieving lower latency and more efficient flow control.
  • Memory management – Flink is working on top of a JVM and can be susceptible to common memory issues, such as long GC pauses. To avoid this, Flink implements custom memory management separate from the JVM memory management system which allows to significantly decrease GC overhead.
  • Event-time processing – when a streaming system is processing an incoming event, it can consider either its originating time (called event time) or current system time (called processing time). Because some events can be delayed (e.g. because of network issues), event-time processing requires processing elements in a different order. Flink has support for event time processing and can process out of order and late events.
  • Low latency and high performance – multiple benchmarks from dataArtisans and Yahoo! suggest that Flink demonstrates impressive performance with huge amounts of data
  • Savepoints – Flink allows you to stop your cluster, update the code you are running and start it again all without losing data.

Simple Flink example

Now let’s try to run Flink and implement a simple streaming application for it. Our application will read a stream of tweets and find the most used tweet in a five-minute window interval.

At first, we need to create a new Flink project. There is nothing difficult about this, and it only requires a simple main function, but the Flink team has created a Maven archetype to generate a new Flink project:

mvn archetype:generate\
    -DarchetypeGroupId=org.apache.flink\
    -DarchetypeArtifactId=flink-quickstart-java\
    -DarchetypeVersion=1.1.3\
    -DgroupId=tweets-flink\
    -DartifactId=tweets-flink\
    -Dversion=1.0\
    -Dpackage=com.example.flink.tweets

We need to add a Twitter connector that we will use to get a stream of tweets to process:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-twitter_2.10</artifactId>
	<version>${flink.version}</version>
</dependency>

To have a proper Flink application, we need to implement a simple main function. We will run it for testing, but Flink will also call it when our application is executed in Flink cluster:


public class TopTweet {
    public static void main(String[] args) throws Exception {
        // Create Flink Streaming environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    }
}

Extracting hashtags

As you remember, when we write Flink applications, we need to define a processing pipeline and first and foremost, we need to add a source for our data. Twitter only allows to read a tweets stream using Twitter application credentials, so we need to create one and copy its credentials:

Properties props = new Properties();
props.setProperty(TwitterSource.CONSUMER_KEY, "...");
props.setProperty(TwitterSource.CONSUMER_SECRET, "...");
props.setProperty(TwitterSource.TOKEN, "...");
props.setProperty(TwitterSource.TOKEN_SECRET, "...");

With this we can create a source of our data and register it in our application environment to get a stream of data:

DataStream<String> streamSource = env.addSource(new TwitterSource(props));

DataStream is a class that is used to define a sequence of data processing steps Flink has to do. It gives access to a stream of JSON strings in the Twitter API format. To convert it to a stream of hashtags, we need to use the flatMap function that converts a single input element into one, multiple or none output elements:

env.addSource(new TwitterSource(props))
    .flatMap(new ExtractHashTags())

ExtractHashTags implements the FlatMapFunction interface that has a single function flatMap that is called for every tweet (passed as the first argument). The second argument in this function, collector, is used to output values to the next step of the transformation pipeline. In this case, we parse every Tweet JSON, extract a list of hashtags from it and pass these hashtags to the next pipeline operator.

private static class ExtractHashTags implements FlatMapFunction<String, Tuple2<String, Integer>> {

    private static final ObjectMapper mapper = new ObjectMapper();

    @Override
    public void flatMap(String tweetJsonStr, Collector<Tuple2<String, Integer>> collector) throws Exception {
        JsonNode tweetJson = mapper.readTree(tweetJsonStr);
        JsonNode entities = tweetJson.get("entities");
        if (entities == null) return;

        JsonNode hashtags = entities.get("hashtags");
        if (hashtags == null) return;

        // Iterate over hashtags in a tweet
        for (Iterator<JsonNode> iter = hashtags.getElements(); iter.hasNext();) {
            JsonNode node = iter.next();
            String hashtag = node.get("text").getTextValue();

            if (hashtag.matches("\\w+")) {
                // Pass a hashtag to the next stage in Flink pipeline
                collector.collect(new Tuple2<>(hashtag, 1));
            }
        }
    }
}

Notice that we output Tuple2 instances with two fields: hashtag name and number one. We will need this number later when we count how many times each hashtag was used per time window.

Get number of hashtag occurrences

Now we can count how many times each hashtag was encountered during a five-minute period. This is pretty straightforward. At first, we need to define a key in our stream using the keyBy function. It is somewhat similar to “group by” functions and means that the following operations will be performed on individual groups of data. The only parameter to the keyBy function specifies what field in each input element should be used as a key. In this case, we pass 0 since the first element of each tuple is a name of a hashtag.

Since the stream of tweets is unbounded, we need to specify processing duration. For this, we need to use the timeWindow function that defines that all processing should be done on consequent five minutes intervals.

env.addSource(new TwitterSource(props))
    .flatMap(new ExtractHashTags())
    // Group by hashtag name
    .keyBy(0)
    // Following transformations are performed on a group of tuples with the 
    // same hashtag
    // Work in 5 minutes intervals
    .timeWindow(Time.minutes(5))
    // Get number of each hashtag occurances
    .sum(1)

The last function defines how to process elements in each fiveminute window. Since we want to count how many times each hashtag was used, and since the second element of every tuple is equal to 1, we can use the sum function.

Find top hashtag

At this stage, we will have a stream of hashtags with the number of times every hashtag was encountered per five-minute interval. Now we need to find the top hashtag in every time window. First, we need to call the timeWindowAll which will allow us to process all elements in a stream in a specified time window.

Flink has several versions of the max function that allows finding the max element using a field or a set of fields, but just to make it more interesting we will define our function that will find the top hashtag. To do this, we can use the apply function and pass a function that will find the top tweet.

The last step in our pipeline is to output the data somewhere. Here we use the print function that will simply print the output to the stdout, but we could alternatively send the output to a file or an external system like Kafka or HDFS.

env.addSource(new TwitterSource(props))
    .flatMap(new ExtractHashTags())
    .keyBy(0)
    .timeWindow(Time.minutes(5))
    .sum(1)
    // Process 5 minutes of hashtags data
    .timeWindowAll(Time.minutes(5))
    // Find top hashtag
    .apply(new GetTopHashTag())
    // Print result to stdou
    .print();

Now let’s see how we can define a function that will find the top hashtag. To do this we need to implement the AllWindowFunction that has a single method apply with three arguments:

  • window – contains information about a window we are processing, such as window’s start and end timestamps
  • stream elements – an iterator for all elements in a stream in a given window
  • out – a collector instance similar to one in the flatMap function that we can use to generate output elements. We will use it to produce one element per window (top tweet), but it can be handy to produce multiple elements, e.g. if we need to find top N elements.

Notice that in this case, we do not produce a Tuple as in previous examples. Instead, we produce a POJO instance. We could output a Tuple with four fields, and it would work just fine, but I wanted to show that Flink can also work with POJOs and is not limited to Tuples.

private static class GetTopHashTag implements AllWindowFunction<Tuple2<String,Integer>, TweetsCount, TimeWindow> {
    @Override
    public void apply(TimeWindow window, Iterable<Tuple2<String, Integer>> hashTags, Collector<TweetsCount> out) throws Exception {
        Tuple2<String, Integer> topHashTag = new Tuple2<>("", 0);
        // Find top hashtag
        for (Tuple2<String, Integer> hashTag : hashTags) {
            if (hashTag.f1 > topHashTag.f1) {
                topHashTag = hashTag;
            }
        }
        // Output top hashtag
        out.collect(new TweetsCount(window.getStart(), window.getEnd(), topHashTag.f0, topHashTag.f1));
    }
}

The stream processing won’t run just yet since all previous code is needed to build an execution plan. Now we need to instruct Flink that the execution plan is finished and that it can start executing it. To do it, we simply need to call the execute method on the StreamExecutionEnvironment instance:

env.addSource(new TwitterSource(props))
    .flatMap(new ExtractHashTags())
    ...
    .print();

env.execute();

Now if we run the application and see the result:

1> TweetsCount{windowStart=Sat Dec 17 15:15:00 GMT 2016, windowEnd=Sat Dec 17 15:20:00 GMT 2016, hashTag='AFFSuzukiCup', count=93}
2> TweetsCount{windowStart=Sat Dec 17 15:20:00 GMT 2016, windowEnd=Sat Dec 17 15:25:00 GMT 2016, hashTag='AFFSuzukiCup', count=78}
...

More information

If you want to know more about Apache Flink you can take a look at my Pluralsight course where I cover Apache Flink in more details: Understanding Apache Flink

Here is a short preview of this course:

Posted by Ivan Mushketyk

Principal Software engineer and life-long learner. Creating courses for Pluralsight. Writing for DZone, SitePoint, and SimpleProgrammer.